package org.hornetq.core.server.cluster.impl;

import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.Future;

/* loaded from: input_file:WEB-INF/lib/hornetq-core-2.1.2.Final.jar:org/hornetq/core/server/cluster/impl/Redistributor.class */
public class Redistributor implements Consumer {
    private static final Logger log = Logger.getLogger(Redistributor.class);
    private boolean active;
    private final StorageManager storageManager;
    private final PostOffice postOffice;
    private final Executor executor;
    private final int batchSize;
    private final Queue queue;
    private int count;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hornetq-core-2.1.2.Final.jar:org/hornetq/core/server/cluster/impl/Redistributor$Prompter.class */
    public class Prompter implements Runnable {
        private Prompter() {
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (Redistributor.this) {
                Redistributor.this.active = true;
                Redistributor.this.queue.deliverAsync();
            }
        }
    }

    public Redistributor(Queue queue, StorageManager storageManager, PostOffice postOffice, Executor executor, int i) {
        this.queue = queue;
        this.storageManager = storageManager;
        this.postOffice = postOffice;
        this.executor = executor;
        this.batchSize = i;
    }

    @Override // org.hornetq.core.server.Consumer
    public Filter getFilter() {
        return null;
    }

    public synchronized void start() {
        this.active = true;
    }

    public synchronized void stop() throws Exception {
        this.active = false;
        Future future = new Future();
        this.executor.execute(future);
        if (future.await(10000L)) {
            return;
        }
        log.warn("Timed out waiting for tasks to complete");
    }

    public synchronized void close() {
        Future future = new Future();
        this.executor.execute(future);
        if (!future.await(10000L)) {
            throw new IllegalStateException("Timed out waiting for executor to complete");
        }
        this.active = false;
    }

    @Override // org.hornetq.core.server.Consumer
    public synchronized HandleStatus handle(MessageReference messageReference) throws Exception {
        if (!this.active) {
            return HandleStatus.BUSY;
        }
        TransactionImpl transactionImpl = new TransactionImpl(this.storageManager);
        if (!this.postOffice.redistribute(messageReference.getMessage(), this.queue, transactionImpl)) {
            return HandleStatus.BUSY;
        }
        doRedistribute(messageReference, transactionImpl);
        return HandleStatus.HANDLED;
    }

    private void doRedistribute(MessageReference messageReference, Transaction transaction) throws Exception {
        messageReference.handled();
        this.queue.acknowledge(transaction, messageReference);
        transaction.commit();
        this.storageManager.afterCompleteOperations(new IOAsyncTask() { // from class: org.hornetq.core.server.cluster.impl.Redistributor.1
            @Override // org.hornetq.core.asyncio.AIOCallback
            public void onError(int i, String str) {
                Redistributor.log.warn("IO Error during redistribution, errorCode = " + i + " message = " + str);
            }

            @Override // org.hornetq.core.asyncio.AIOCallback
            public void done() {
                Redistributor.this.execPrompter();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void execPrompter() {
        this.count++;
        if (this.count == this.batchSize) {
            this.active = false;
            this.executor.execute(new Prompter());
            this.count = 0;
        }
    }
}
